package com.shujia.stream

import org.apache.flink.table.api.{EnvironmentSettings, TableEnvironment}

object DImCard {
  def main(args: Array[String]): Unit = {


    //环境设置对象
    val settings: EnvironmentSettings = EnvironmentSettings
      .newInstance()
      .inStreamingMode()
      .build()

    /**
     * flink sql环境
     *
     */
    val table: TableEnvironment = TableEnvironment.create(settings)

    //创建hive catalog
    table.executeSql(
      """
        | CREATE CATALOG hive_catalog WITH (
        |  'type' = 'hive',
        |  'hive-conf-dir' = '/usr/local/soft/hive-1.2.1/conf'
        |)
        |""".stripMargin)
    //切换catalog
    table.executeSql(
      """
        |use catalog hive_catalog
        |
        |""".stripMargin)

    //切换数据库
    table.executeSql(
      """
        |use dim
        |
        |""".stripMargin)

    //创建mysql sink表
    table.executeSql(
      """
        |CREATE TABLE dim_card_mysql (
        |  card BIGINT,
        |  lon DOUBLE,
        |  lat DOUBLE,
        |  PRIMARY KEY (card) NOT ENFORCED
        |) WITH (
        |   'connector' = 'jdbc',
        |   'url' = 'jdbc:mysql://master:3306/car',
        |   'table-name' = 'dim_card',
        |   'username' = 'root',
        |   'password' = '123456'
        |)
        |""".stripMargin)

    //插入数据
    table.executeSql(
      """
        |insert into dim_card_mysql
        |select * from dim_card
        |""".stripMargin)


  }

}
